向量数据库入门实践
学习目标
- 理解向量数据库的基本概念和应用场景
- 了解主流向量数据库的特点和优劣势
- 掌握使用Chroma、FAISS和Milvus构建向量检索系统的方法
- 学习如何将向量数据库与大语言模型集成
- 通过实战项目熟悉向量数据库的实际应用
向量数据库简介
向量数据库是专门为存储、管理和检索向量数据而设计的数据库系统。传统数据库针对结构化数据的存储和查询进行了优化,而向量数据库则专注于高维向量的高效索引和相似性搜索。
向量数据库与传统数据库的区别
特性 | 传统数据库(如MySQL) | 向量数据库(如Milvus) |
---|---|---|
数据类型 | 结构化数据(数字、文本、日期等) | 向量(高维数值数组) |
查询方式 | 精确匹配、范围查询 | 相似度搜索(近似最近邻查询) |
索引结构 | B-Tree、Hash等 | HNSW、IVF、PQ等专用向量索引 |
查询语言 | SQL | API调用、特定查询语言 |
主要应用 | 事务处理、数据分析 | 语义搜索、推荐系统、多模态检索 |
向量数据库的主要功能
- 向量存储:保存嵌入向量及其元数据
- 高效索引:构建特殊索引结构,支持快速近似最近邻(ANN)搜索
- 相似性搜索:根据余弦相似度、欧几里得距离等度量方法查找相似向量
- 过滤和排序:支持基于元数据的过滤和排序
- 向量操作:支持向量增删改查和批量操作
主流向量数据库对比
向量数据库是专门设计用于存储、管理和检索向量数据的数据库系统。通过特殊的索引结构和相似度计算方法,可以高效地进行近似最近邻(ANN)搜索,是构建RAG(检索增强生成)系统的重要基础设施。
下表提供了主流向量数据库的对比:
名称 | 类型 | 主要特点 | 核心索引 | 适用场景 | 集成难度 |
---|---|---|---|---|---|
Pinecone | 云服务,商业 | 完全托管,自动扩展 | HNSW,IVF | 企业应用,无运维需求 | 低 |
Weaviate | 开源+商业 | 多模态,GraphQL接口 | HNSW | 知识图谱,多模态搜索 | 中 |
Milvus | 开源 | 分布式架构,高扩展性 | HNSW,IVF,PQ | 大规模生产环境 | 中高 |
Qdrant | 开源 | Rust开发,强大过滤 | HNSW | 复杂过滤条件应用 | 中 |
Chroma | 开源 | 轻量级,Python优先 | HNSW | 开发原型,小型应用 | 低 |
FAISS | 开源库 | 高性能,GPU加速 | 多种索引 | 大规模数据集,性能敏感 | 高 |
Vespa | 开源 | 综合搜索平台 | HNSW | 搜索与向量混合需求 | 高 |
Redis | 开源+模块 | 低延迟,内存存储 | HNSW | 实时应用,中小规模数据 | 低 |
向量索引方法简介
HNSW (分层可导航小世界图): 构建多层图结构实现快速导航,检索速度极快但内存消耗大,适合对查询速度要求高的场景
IVF (倒排文件索引): 将向量空间分割为多个聚类进行搜索,构建速度较快、内存消耗适中,但召回率略低
PQ (乘积量化): 通过将向量分解并量化来压缩存储空间,极大节省内存但有损精度,适合超大规模数据集
ANNOY: 使用随机投影树构建索引,可将索引保存到磁盘,适合需要持久化索引的场景
混合索引: 结合多种索引方法的优势,如IVF+PQ兼顾速度和内存,IVF+HNSW结合粗筛和精筛效果
选型关键考量因素
选择合适的向量数据库需要考虑以下关键因素:
1. 数据规模与性能需求
- 小型应用(百万级向量):Chroma, FAISS(单机)
- 中型应用(千万级向量):Qdrant, Pinecone, Redis
- 大型应用(亿级以上向量):Milvus, Vespa, Weaviate企业版
2. 部署与运维复杂度
- 零运维需求:Pinecone等云服务
- 轻量级部署:Chroma, Redis, FAISS
- 自托管分布式:Milvus, Qdrant, Vespa
3. RAG系统集成便捷性
RAG系统通常需要以下流程:文档嵌入→存储→检索→与LLM集成。不同向量数据库在这一流程中的表现:
- 最易集成:Chroma(与LangChain/LlamaIndex深度集成)
- 简洁API:Pinecone, Qdrant
- 复杂但灵活:Milvus, Vespa
4. 向量索引技术
向量数据库的核心是索引技术,主流索引方法包括:
- HNSW(分层可导航小世界图):建立多层图形结构,平衡速度与准确性
- IVF(倒排文件索引):将向量空间分割为多个聚类,提高检索效率
- PQ(乘积量化):将高维向量分解存储,降低内存使用
- ANNOY:使用随机超平面构建树形结构,平衡内存与速度
通过综合考虑上述因素,可以选择最适合特定应用场景的向量数据库。
现代嵌入模型
现代嵌入模型(如BERT、RoBERTa等)采用了更复杂的方法:
- 使用深度神经网络,而不是简单的向量查找表
- 考虑上下文信息,同一个词在不同语境下产生不同的向量
- 采用预训练+微调的方式,先在大量通用文本上训练,再在特定任务上微调
- 引入自监督学习任务,如掩码语言模型(预测被遮挡的词)
这些先进的嵌入模型能够更好地捕捉文本的语义信息,为向量数据库提供高质量的向量表示。
实践一:使用Chroma构建简单的文档检索系统
Chroma是一个轻量级、易于使用的向量数据库,特别适合原型开发和小型应用。它提供了简洁的Python API,可以轻松与其他工具集成。
安装Chroma
方法1:使用pip安装(最简单)
pip install chromadb
方法2:使用Docker安装(推荐用于生产环境)
# 拉取Chroma镜像
docker pull ghcr.io/chroma-core/chroma:latest
# 创建持久化目录
mkdir -p ~/.chroma/data
# 运行Chroma容器
docker run -d \
-p 8000:8000 \
-v ~/.chroma/data:/chroma/data \
--name chroma \
ghcr.io/chroma-core/chroma:latest
客户端连接(在Python中):
import chromadb
# 连接到Docker容器中的Chroma服务
client = chromadb.HttpClient(host="localhost", port=8000)
方法3:使用Docker Compose(适合多容器部署)
创建一个docker-compose.yml
文件:
version: '3'
services:
chroma:
image: ghcr.io/chroma-core/chroma:latest
volumes:
- chroma_data:/chroma/data
ports:
- "8000:8000"
restart: unless-stopped
volumes:
chroma_data:
运行命令:
docker-compose up -d
基本使用方法
import chromadb
from chromadb.utils import embedding_functions
# 创建客户端
client = chromadb.Client()
# 创建集合(可以指定嵌入函数)
# 这里使用默认的嵌入函数,实际应用中应选择合适的模型
embedding_function = embedding_functions.DefaultEmbeddingFunction()
collection = client.create_collection(
name="documents",
embedding_function=embedding_function
)
# 添加文档
documents = [
"Python是一种易于学习的编程语言,广泛应用于数据分析和AI领域。",
"向量数据库专门用于存储和检索向量数据,支持相似性搜索。",
"大语言模型可以生成文本、回答问题,但存在知识截止日期的限制。",
"RAG技术结合了检索和生成功能,提高了大语言模型回答的准确性和时效性。"
]
# 添加数据到集合
collection.add(
documents=documents,
ids=["doc1", "doc2", "doc3", "doc4"]
)
# 查询与"向量数据库"相似的文档
results = collection.query(
query_texts=["向量数据库的应用"],
n_results=2
)
print("查询结果:")
for i, doc_id in enumerate(results['ids'][0]):
doc_text = results['documents'][0][i]
print(f"文档ID: {doc_id}")
print(f"文档内容: {doc_text}")
print("-" * 50)
使用自定义嵌入模型
在实际应用中,选择合适的嵌入模型对检索质量至关重要。下面演示如何使用Sentence Transformers作为嵌入模型:
from sentence_transformers import SentenceTransformer
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
# 创建嵌入函数
embedding_function = SentenceTransformerEmbeddingFunction(
model_name="paraphrase-multilingual-MiniLM-L12-v2" # 多语言模型
)
# 创建集合
collection = client.create_collection(
name="multilingual_docs",
embedding_function=embedding_function
)
# 添加多语言文档
documents = [
"Python是一种易于学习的编程语言,广泛应用于数据分析和AI领域。",
"Python is a programming language that is easy to learn and widely used in data analysis and AI.",
"向量数据库专门用于存储和检索向量数据,支持相似性搜索。",
"Vector databases are specifically designed for storing and retrieving vector data, supporting similarity search."
]
collection.add(
documents=documents,
ids=["zh_doc1", "en_doc1", "zh_doc2", "en_doc2"]
)
# 跨语言查询
results = collection.query(
query_texts=["Vector database applications"],
n_results=2
)
print("查询结果:")
for i, doc_id in enumerate(results['ids'][0]):
doc_text = results['documents'][0][i]
print(f"文档ID: {doc_id}")
print(f"文档内容: {doc_text}")
print("-" * 50)
使用元数据和过滤
Chroma支持为每个文档添加元数据,并基于元数据进行过滤:
# 添加带元数据的文档
collection.add(
documents=[
"Python适合初学者和专业开发者使用。",
"机器学习是人工智能的一个子领域。",
"向量数据库可以高效存储和检索高维向量。",
"自然语言处理技术使计算机能够理解人类语言。"
],
metadatas=[
{"category": "programming", "difficulty": "beginner"},
{"category": "ai", "difficulty": "intermediate"},
{"category": "database", "difficulty": "advanced"},
{"category": "nlp", "difficulty": "intermediate"}
],
ids=["doc5", "doc6", "doc7", "doc8"]
)
# 使用元数据过滤查询
results = collection.query(
query_texts=["人工智能技术"],
n_results=3,
where={"difficulty": "intermediate"} # 只返回中级难度的文档
)
print("过滤查询结果:")
for i, doc_id in enumerate(results['ids'][0]):
doc_text = results['documents'][0][i]
print(f"文档ID: {doc_id}")
print(f"文档内容: {doc_text}")
print("-" * 50)
持久化存储
默认情况下,Chroma使用内存存储,数据会在程序结束后丢失。为了持久化数据,可以使用PersistentClient:
persistent_client = chromadb.PersistentClient(path="./chroma_db")
persistent_collection = persistent_client.create_collection(name="persistent_docs")
# 添加数据
persistent_collection.add(
documents=["这是一个持久化存储的文档示例"],
ids=["persistent_doc1"]
)
# 数据会保存在./chroma_db目录中
实践二:使用FAISS构建高性能向量检索系统
FAISS (Facebook AI Similarity Search) 是一个用于高效相似性搜索和密集向量聚类的库,专为处理大规模向量集合而设计。
安装FAISS
方法1:使用pip安装
# 仅CPU版本
pip install faiss-cpu
# GPU版本(需要CUDA支持)
pip install faiss-gpu
方法2:使用conda安装(推荐,特别是GPU版本)
# CPU版本
conda install -c conda-forge faiss-cpu
# GPU版本
conda install -c conda-forge faiss-gpu
方法3:使用Docker(适合生产环境)
# 拉取官方FAISS镜像
docker pull atreiding/faiss:latest
# 运行容器
docker run -it --rm atreiding/faiss:latest
基本使用示例
import numpy as np
import faiss
# 创建随机向量数据集作为示例
dimension = 128 # 向量维度
num_vectors = 1000 # 向量数量
vectors = np.random.random((num_vectors, dimension)).astype('float32')
# 创建索引(使用L2距离)
index = faiss.IndexFlatL2(dimension)
# 添加向量到索引
index.add(vectors)
# 创建查询向量
query_vector = np.random.random((1, dimension)).astype('float32')
# 执行搜索,获取最近的5个向量
k = 5
distances, indices = index.search(query_vector, k)
print("搜索结果索引:", indices)
print("距离:", distances)
高级索引类型
FAISS提供多种索引类型,适合不同规模和需求:
# 基础精确搜索,适合小规模数据(<1M)
index_flat = faiss.IndexFlatL2(dimension)
# 倒排文件索引,适合中等规模数据(<10M)
nlist = 100 # 聚类中心数量
index_ivf = faiss.IndexIVFFlat(index_flat, dimension, nlist)
index_ivf.train(vectors) # IVF索引需要训练
# 乘积量化 + IVF,适合大规模数据
index_ivfpq = faiss.IndexIVFPQ(index_flat, dimension, nlist, 8, 8)
index_ivfpq.train(vectors)
# HNSW索引,高速搜索
M = 16 # 连接数
index_hnsw = faiss.IndexHNSWFlat(dimension, M)
与句向量嵌入结合使用
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
# 加载预训练模型
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
# 示例文本
texts = [
"人工智能正在改变各行各业",
"机器学习是AI的一个子领域",
"深度学习基于神经网络",
"自然语言处理让机器理解人类语言",
"计算机视觉使机器能够"看见"世界"
]
# 生成嵌入
embeddings = model.encode(texts)
embeddings = np.array(embeddings).astype('float32')
# 创建索引
dimension = embeddings.shape[1]
index = faiss.IndexFlatIP(dimension) # 内积相似度(余弦相似度需要归一化向量)
# 添加向量
index.add(embeddings)
# 查询
query = "AI技术的应用"
query_embedding = model.encode([query])[0].astype('float32').reshape(1, -1)
# 搜索
k = 2
distances, indices = index.search(query_embedding, k)
print(f"查询: '{query}'")
for i, idx in enumerate(indices[0]):
print(f"结果 {i+1}: {texts[idx]}, 相似度: {distances[0][i]}")
实践三:使用Milvus构建分布式向量检索系统
Milvus是一个开源的向量数据库,专为大规模向量检索设计,支持分布式部署和水平扩展。
安装Milvus
方法1:使用Docker快速部署(单机版)
# 创建存储目录
mkdir -p ~/milvus/data
mkdir -p ~/milvus/conf
# 下载配置文件
wget https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-standalone-docker-compose.yml -O docker-compose.yml
# 启动Milvus
docker-compose up -d
方法2:使用Docker Compose(集群版)
# 下载集群版配置
wget https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-cluster-docker-compose.yml -O docker-compose.yml
# 启动Milvus集群
docker-compose up -d
方法3:使用Helm在Kubernetes上部署
# 添加Milvus Helm仓库
helm repo add milvus https://milvus-io.github.io/milvus-helm/
# 更新仓库
helm repo update
# 安装Milvus(单机版)
helm install my-release milvus/milvus --set cluster.enabled=false
# 安装Milvus(集群版)
helm install my-release milvus/milvus --set cluster.enabled=true
验证安装
# 检查容器运行状态
docker-compose ps
# 访问Milvus Web界面
# 在浏览器中打开 http://localhost:9001
安装Python客户端
pip install pymilvus
基本使用方法
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np
# 连接到Milvus服务
connections.connect("default", host="localhost", port="19530")
# 定义集合字段
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1000)
]
# 创建集合模式
schema = CollectionSchema(fields=fields, description="文档检索")
# 创建集合
collection_name = "document_collection"
if utility.has_collection(collection_name):
collection = Collection(name=collection_name)
else:
collection = Collection(name=collection_name, schema=schema)
# 创建索引
index_params = {
"metric_type": "COSINE", # 余弦相似度
"index_type": "HNSW", # HNSW索引
"params": {
"M": 16, # HNSW图的出度参数
"efConstruction": 200 # 构建时的搜索深度
}
}
# 为嵌入字段创建索引
collection.create_index(field_name="embedding", index_params=index_params)
添加和检索数据
from sentence_transformers import SentenceTransformer
# 准备文本数据
documents = [
"Milvus是一个专门为向量相似性搜索和AI应用设计的数据库",
"向量数据库支持高效的相似性搜索和近似最近邻查询",
"与传统数据库不同,向量数据库优化了对高维向量数据的管理",
"人工智能应用中经常需要处理特征向量和嵌入表示",
"嵌入模型将文本转换为高维向量,捕捉其语义含义",
"大语言模型结合向量检索可以实现更精准的问答系统"
]
# 加载嵌入模型
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
# 生成嵌入
embeddings = model.encode(documents)
# 准备插入的数据
entities = [
{"embedding": embeddings.tolist(), "text": documents}
]
# 插入数据
collection.insert(entities)
# 加载集合(使集合对搜索可见)
collection.load()
# 执行向量搜索
query = "向量数据库的应用"
query_embedding = model.encode([query])[0].tolist()
search_params = {
"metric_type": "COSINE",
"params": {"ef": 100} # 搜索时的候选池大小
}
results = collection.search(
data=[query_embedding], # 查询向量
anns_field="embedding", # 搜索的字段
param=search_params, # 搜索参数
limit=3, # 返回前3个结果
output_fields=["text"] # 返回的字段
)
# 显示结果
print(f"查询: '{query}'")
for i, hits in enumerate(results):
print(f"查询结果 {i+1}:")
for hit in hits:
print(f"ID: {hit.id}, 距离: {hit.distance}")
print(f"文本: {hit.entity.get('text')}")
print("-" * 50)
使用元数据过滤
Milvus支持向量搜索与标量过滤的结合:
# 添加带分类的文档
documents_with_category = [
{"text": "Python是一种流行的编程语言", "category": "programming"},
{"text": "JavaScript用于网页交互", "category": "programming"},
{"text": "机器学习使用算法从数据中学习", "category": "ai"},
{"text": "深度学习是机器学习的一个子领域", "category": "ai"},
{"text": "向量数据库优化了向量检索操作", "category": "database"},
{"text": "传统关系型数据库使用SQL查询", "category": "database"}
]
# 修改集合模式以包含分类字段
category_field = FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=100)
fields.append(category_field)
# 重新创建集合(实际应用中应考虑避免重复创建)
new_collection_name = "categorized_documents"
new_schema = CollectionSchema(fields=fields, description="带分类的文档检索")
if utility.has_collection(new_collection_name):
utility.drop_collection(new_collection_name)
new_collection = Collection(name=new_collection_name, schema=new_schema)
# 为嵌入字段创建索引
new_collection.create_index(field_name="embedding", index_params=index_params)
# 生成嵌入并插入数据
for doc in documents_with_category:
embedding = model.encode([doc["text"]])[0].tolist()
new_collection.insert([{
"embedding": embedding,
"text": doc["text"],
"category": doc["category"]
}])
# 加载集合
new_collection.load()
# 带过滤条件的搜索(只搜索AI类别的文档)
query = "机器学习技术"
query_embedding = model.encode([query])[0].tolist()
results = new_collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=2,
expr="category == 'ai'", # 表达式过滤条件
output_fields=["text", "category"]
)
# 显示结果
print(f"带过滤的查询: '{query}' (类别: ai)")
for hits in results:
for hit in hits:
print(f"ID: {hit.id}, 距离: {hit.distance}")
print(f"文本: {hit.entity.get('text')}")
print(f"类别: {hit.entity.get('category')}")
print("-" * 50)
实战项目:构建语义文档检索系统
下面我们将整合前面学习的内容,构建一个完整的语义文档检索系统:
import os
import argparse
import sys
from pathlib import Path
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
def extract_text_from_file(file_path):
"""从文件中提取文本内容"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
return ""
def create_document_chunks(text, chunk_size=1000, overlap=200):
"""将长文本分割为重叠的块"""
chunks = []
if len(text) <= chunk_size:
chunks.append(text)
else:
for i in range(0, len(text), chunk_size - overlap):
chunk = text[i:i + chunk_size]
if len(chunk) >= 200: # 避免太小的块
chunks.append(chunk)
return chunks
def build_vector_db(docs_dir, db_path):
"""从文档目录构建向量数据库"""
# 初始化Chroma客户端
client = chromadb.PersistentClient(path=db_path)
# 使用多语言模型作为嵌入函数
embedding_function = SentenceTransformerEmbeddingFunction(
model_name="paraphrase-multilingual-MiniLM-L12-v2"
)
# 创建或获取集合
if "documents" in client.list_collections():
collection = client.get_collection(
name="documents",
embedding_function=embedding_function
)
print("使用现有集合")
else:
collection = client.create_collection(
name="documents",
embedding_function=embedding_function
)
print("创建新集合")
# 处理文档
documents = []
metadatas = []
ids = []
doc_id = 0
for root, _, files in os.walk(docs_dir):
for filename in files:
if filename.endswith('.txt') or filename.endswith('.md'):
file_path = os.path.join(root, filename)
content = extract_text_from_file(file_path)
# 分块处理长文档
chunks = create_document_chunks(content)
for i, chunk in enumerate(chunks):
documents.append(chunk)
rel_path = os.path.relpath(file_path, docs_dir)
metadatas.append({
"source": rel_path,
"filename": filename,
"chunk": i
})
ids.append(f"doc_{doc_id}_{i}")
doc_id += 1
print(f"已处理: {file_path} -> {len(chunks)}个块")
# 批量添加到集合
if documents:
# Chroma有批量大小限制,分批添加
batch_size = 100
for i in range(0, len(documents), batch_size):
end = min(i + batch_size, len(documents))
collection.add(
documents=documents[i:end],
metadatas=metadatas[i:end],
ids=ids[i:end]
)
print(f"向量数据库构建完成! 共添加了 {len(documents)} 个文档块。")
else:
print("未找到可处理的文档")
return collection
def search_documents(collection, query, top_k=5):
"""搜索文档"""
results = collection.query(
query_texts=[query],
n_results=top_k
)
search_results = []
if results['documents'] and results['documents'][0]:
for i, (doc, metadata, doc_id) in enumerate(zip(
results['documents'][0],
results['metadatas'][0],
results['ids'][0]
)):
result = {
'content': doc,
'metadata': metadata,
'id': doc_id,
'relevance': results['distances'][0][i] if 'distances' in results else None
}
search_results.append(result)
return search_results
def display_results(results, query):
"""显示搜索结果"""
print(f"\n找到 {len(results)} 个匹配 '{query}' 的文档块:")
print("="*80)
for i, result in enumerate(results):
source = result['metadata'].get('source', 'Unknown')
print(f"[{i+1}] 来源: {source} (块 {result['metadata'].get('chunk', 0)})")
if result['relevance'] is not None:
print(f"相关度: {1 - result['relevance']:.4f}")
# 显示部分内容
content = result['content']
if len(content) > 300:
content = content[:300] + "..."
print(f"内容: {content}")
print("-"*80)
def interactive_search(collection):
"""交互式搜索界面"""
print("\n欢迎使用语义文档检索系统!")
print("输入 'q' 退出")
while True:
query = input("\n请输入搜索关键词: ")
if query.lower() == 'q':
break
results = search_documents(collection, query)
display_results(results, query)
if results:
while True:
detail = input("\n输入编号查看完整内容 (或'n'继续): ")
if detail.lower() == 'n':
break
try:
idx = int(detail) - 1
if 0 <= idx < len(results):
print("\n" + "="*80)
print(f"来源: {results[idx]['metadata'].get('source')}")
print("完整内容:")
print(results[idx]['content'])
print("="*80)
else:
print("无效的编号")
except ValueError:
print("请输入有效的数字或'n'")
def main():
parser = argparse.ArgumentParser(description='语义文档检索系统')
parser.add_argument('--docs', type=str, help='要索引的文档目录')
parser.add_argument('--db', type=str, default='./vector_db', help='向量数据库存储路径')
parser.add_argument('--rebuild', action='store_true', help='重建向量数据库')
args = parser.parse_args()
# 验证向量数据库是否存在
db_exists = os.path.exists(args.db) and os.path.isdir(args.db)
if not db_exists or args.rebuild:
if not args.docs:
print("错误: 向量数据库不存在,请提供文档目录以构建数据库")
parser.print_help()
sys.exit(1)
# 构建向量数据库
print(f"正在为目录 '{args.docs}' 构建向量数据库...")
collection = build_vector_db(args.docs, args.db)
else:
# 打开现有向量数据库
print(f"打开现有向量数据库: {args.db}")
client = chromadb.PersistentClient(path=args.db)
embedding_function = SentenceTransformerEmbeddingFunction(
model_name="paraphrase-multilingual-MiniLM-L12-v2"
)
collection = client.get_collection(
name="documents",
embedding_function=embedding_function
)
# 启动交互式搜索
interactive_search(collection)
if __name__ == "__main__":
main()
使用方法
# 构建向量数据库
python semantic_search.py --docs /path/to/your/documents --db ./vector_db
# 使用现有向量数据库
python semantic_search.py --db ./vector_db
# 重建向量数据库
python semantic_search.py --docs /path/to/your/documents --db ./vector_db --rebuild
小结
在本节中,我们学习了:
- 向量数据库的基本概念和主要功能
- 主流向量数据库的特点和应用场景
- 使用Chroma构建简单的向量检索系统
- 使用FAISS实现高性能向量检索
- 使用Milvus构建分布式向量检索系统
- 实现一个完整的语义文档检索应用
向量数据库是大语言模型应用的重要基础设施,特别是在构建RAG(检索增强生成)系统时发挥关键作用。选择合适的向量数据库取决于具体的应用场景、数据规模和性能需求。
思考题
- 在选择向量数据库时,应考虑哪些关键因素?不同应用场景下,如何权衡这些因素?
- 文档分块策略(块大小、重叠程度)如何影响检索效果?如何为特定应用确定最佳的分块策略?
- 如何评估向量检索系统的质量?除了召回率和精确度外,还有哪些指标可以考虑?
- 向量数据库如何与传统数据库结合,构建更复杂的应用?
在下一节中,我们将学习如何将向量数据库与大语言模型集成,构建完整的RAG系统。